 1.Flink的State--状态原理及原理剖析之广播状态
   
   什么是广播状态？
   所有并行实例，这些实例将它们维持为状态。不广播另一个流的事件，而是将其发送到同一运营商的各
个实例，并与广播流的事件一起处理。
   新的广播状态非常适合需要加入低吞吐量和高吞吐量流或需要动态更新其处理逻辑的应用程序。我们将
使用后一个用例的具体示例来解释广播状态
   广播状态下的动态模式评估
   想象一下，一个电子商务网站将所有用户的交互捕获为用户操作流。运营该网站的公司有兴趣分析交互
以增加收入，改善用户体验，以及检测和防止恶意行为。
   该网站实现了一个流应用程序，用于检测用户事件流上的模式。但是，公司希望每次模式更改时都避免
修改和重新部署应用程序。相反，应用程序在从模式流接收新模式时摄取第二个模式流并更新其活动模
式。在下文中，我们将逐步讨论此应用程序，并展示它如何利用Apache Flink中的广播状态功能。
   我们的示例应用程序摄取了两个数据流。第一个流在网站上提供用户操作，并在上图的左上方显示。用
户交互事件包括操作的类型（用户登录，用户注销，添加到购物车或完成付款）和用户的ID，其由颜色
编码。图示中的用户动作事件流包含用户1001的注销动作，其后是用户1003的支付完成事件，以及用
户1002的“添加到购物车”动作。
   第二流提供应用将执行的动作模式。评估。模式由两个连续的动作组成。在上图中，模式流包含以下两
个：
     1. 模式＃1：用户登录并立即注销而无需浏览电子商务网站上的其他页面。
     2. 模式＃2：用户将商品添加到购物车并在不完成购买的情况下注销。
   这些模式有助于企业更好地分析用户行为，检测恶意行为并改善网站体验。例如，如果项目被添加到购
物车而没有后续购买，网站团队可以采取适当的措施来更好地了解用户未完成购买的原因并启动特定程
序以改善网站转换（如提供折扣代码，限时免费送货优惠等）
   在右侧，该图显示了操作员的三个并行任务，即摄取模式和用户操作流，评估操作流上的模式，并在下
游发出模式匹配。为简单起见，我们示例中的运算符仅评估具有两个后续操作的单个模式。当从模式流
接收到新模式时，替换当前活动模式。原则上，还可以实现运算符以同时评估更复杂的模式或多个模
式，这些模式可以单独添加或移除。
   我们将描述模式匹配应用程序如何处理用户操作和模式流。
   首先，将模式发送给操作员。该模式被广播到运营商的所有三个并行任务。任务将模式存储在其广播状
态中。由于广播状态只应使用广播数据进行更新，因此所有任务的状态始终预期相同。
   接下来，第一个用户操作按用户ID分区并发送到操作员任务。分区可确保同一用户的所有操作都由同一
任务处理。上图显示了操作员任务消耗第一个模式和前三个操作事件后应用程序的状态。
   当任务收到新的用户操作时，它会通过查看用户的最新和先前操作来评估当前活动的模式。对于每个用
户，操作员将先前的操作存储在键控状态。由于上图中的任务到目前为止仅为每个用户收到了一个操作
(我们刚刚启动了应用程序),因此不需要评估该模式。最后，用户键控状态中的先前操作被更新为最
新动作，以便能够在同一用户的下一个动作到达时查找它。
   在处理前三个动作之后，下一个事件（用户1001的注销动作）被运送到处理用户1001的事件的任务。
当任务接收到动作时，它从广播状态中查找当前模式并且用户1001的先前操作。由于模式匹配两个动
作，因此任务发出模式匹配事件。最后，任务通过使用最新操作覆盖上一个事件来更新其键控状态。
   当新模式到达模式流时，它被广播到所有任务，并且每个任务通过用新模式替换当前模式来更新其广播
状态。
   一旦用新模式更新广播状态，匹配逻辑就像之前一样继续，即，用户动作事件由密钥分区并由负责任务
评估。
   如何使用广播状态实现应用程序？
   到目前为止，我们在概念上讨论了该应用程序并解释了它如何使用广播状态来评估事件流上的动态模
式。接下来，我们将展示如何使用Flink的DataStream API和广播状态功能实现示例应用程序。
   让我们从应用程序的输入数据开始。我们有两个数据流，操作和模式。在这一点上，我们并不关心流来
自何处。这些流可以从Apache Kafka或Kinesis或任何其他系统中摄取。并与各两个字段的POJO：
DataStream<Action> actions = ???`
`DataStream<Pattern> patterns = ???
Action``Pattern  
       Action ： Long userId ， String action
       Pattern ：， String firstAction``String secondAction
   作为第一步，我们在属性上键入操作流。接下来，我们准备广播状态。广播状态始终表示为 Flink提供
的最通用的状态原语。由于我们的应用程序一次只评估和存储一个，我们将广播状态配置为具有键类型
和值类型。使用 广播状态，我们在流上应用转换并接收 。在我们获得了keyed Stream和广播流之后，
我们都流式传输并应用了一个 userId
   代码
package com.lagou.state;

public class UserAction {
    private Long userId;
    private String userAction;

    public UserAction(Long userId, String userAction) {
        this.userId = userId;
        this.userAction = userAction;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getUserAction() {
        return userAction;
    }

    public void setUserAction(String userAction) {
        this.userAction = userAction;
    }

    @Override
    public String toString() {
        return "UserAction{" +
                "userId=" + userId +
                ", userAction='" + userAction + '\'' +
                '}';
    }
}

package com.lagou.state;

public class MyPattern {
    private String firstAction;
    private String secondAction;

    public MyPattern() {
    }

    public MyPattern(String firstAction, String secondAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
    }

    public String getFirstAction() {
        return firstAction;
    }

    public void setFirstAction(String firstAction) {
        this.firstAction = firstAction;
    }

    public String getSecondAction() {
        return secondAction;
    }

    public void setSecondAction(String secondAction) {
        this.secondAction = secondAction;
    }

    @Override
    public String toString() {
        return "MyPattern{" +
                "firstAction='" + firstAction + '\'' +
                ", secondAction='" + secondAction + '\'' +
                '}';
    }
}

package com.lagou.state;

import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

public class BoradCastDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        //两套数据流，1：用户行为  2：模式
        UserAction ac1 = new UserAction(1001l, "login");
        UserAction ac2 = new UserAction(1003l, "pay");
        UserAction ac3 = new UserAction(1002l, "car");
        UserAction ac4 = new UserAction(1001l, "logout");
        UserAction ac5 = new UserAction(1003l, "car");
        UserAction ac6 = new UserAction(1002l, "logout");
        DataStreamSource<UserAction> actions = env.fromElements(ac1, ac2, ac3, ac4, ac5, ac6);

        MyPattern myPattern1 = new MyPattern("login", "logout");
        MyPattern myPattern2 = new MyPattern("car", "logout");
        DataStreamSource<MyPattern> patterns = env.fromElements(myPattern1);

        KeyedStream<UserAction, Long> keyByed = actions.keyBy(value -> value.getUserId());

        //将模式数据流广播到下游的所有算子
        MapStateDescriptor<Void, MyPattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class));
        BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(bcStateDescriptor);


        SingleOutputStreamOperator<Tuple2<Long, MyPattern>> process = keyByed.connect(broadcastPatterns).process(new PatternEvaluator());
        //将匹配成功的结果输出到控制台
        process.print();
        env.execute();
    }

    public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>> {
        ValueState<String> prevActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            //初始化keyedState
            prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<String>("lastAction", Types.STRING));
        }

        //每来一个Action数据，触发执行
        @Override
        public void processElement(UserAction value, ReadOnlyContext ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            //把用户行为流和模式流中的模式进行匹配
            ReadOnlyBroadcastState<Void, MyPattern> pattrens = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            MyPattern myPattern = pattrens.get(null);
            String prevAction = prevActionState.value();
            if (myPattern != null && prevAction != null) {
                if (myPattern.getFirstAction().equals(prevAction) && myPattern.getSecondAction().equals(value.getUserAction())) {
                    //如果匹配成...
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), myPattern));
                } else {
                    //如果匹配不成功...
                }
            }
            prevActionState.update(value.getUserAction());
        }

        //每一次来一个模式Pattren的时候触发执行
        @Override
        public void processBroadcastElement(MyPattern value, Context ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            BroadcastState<Void, MyPattern> bcstate = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            bcstate.put(null, value);
        }
    }
}
   
   
       processBroadcastElement() 为广播流的每个记录调用。在我们的 函数中，我们只是使用键将
接收到的记录放入广播状态（记住，我们只存储一个模式）。 PatternEvaluator
       Pattern``null MapState
	   processElement() 为键控流的每个记录调用。它提供对广播状态的只读访问，以防止修改导致
跨函数的并行实例的不同广播状态。 从广播状态检索当前模式的方法和从键控状态检索用户的先
前动作。如果两者都存在，则检查先前和当前操作是否与模式匹配，并且如果是这种情况则发出模
式匹配记录。最后，它将键控状态更新为当前用户操作。
       processElement()``PatternEvaluator
	   onTimer() 在先前注册的计时器触发时调用。定时器可以在任何处理方法中注册，并用于执行计
算或将来清理状态。我们在示例中没有实现此方法以保持代码简洁。但是，当用户在一段时间内未
处于活动状态时，它可用于删除用户的最后一个操作，以避免由于非活动用户而导致状态增长。
   
   您可能已经注意到了处理方法的上下文对象。上下文对象提供对其他功能的访问，例如
KeyedBroadcastProcessFunction
       广播状态（读写或只读，取决于方法），
       A，可以访问记录的时间戳，当前的水印，以及可以注册计时器， TimerService
       当前密钥（仅适用于 ），和 processElement()
       一种将函数应用于每个注册密钥的键控状态的方法（仅适用于） processBroadcastElement()
   在具有就像任何其他ProcessFunction完全进入状态弗林克和时间特性，因此可以用来实现复杂的应用
程序逻辑。广播状态被设计为适应不同场景和用例的多功能特性。虽然我们只讨论了一个相当简单且受
限制的应用程序，但您可以通过多种方式使用广播状态来实现应用程序的要求。
   KeyedBroadcastProcessFunction、
   结论
   在这篇博文中，我们向您介绍了一个示例应用程序，以解释Apache Flink的广播状态以及它如何用于评
估事件流上的动态模式。我们还讨论了API并展示了我们的示例应用程序的源代码。